package com.kool.kmqtt.server;

import com.alibaba.fastjson.JSON;
import com.kool.kmqtt.server.constant.PacketTypeEnum;
import com.kool.kmqtt.server.encoder.PacketEncoder;
import com.kool.kmqtt.server.log.SendLog;
import com.kool.kmqtt.service.TopicConstant;
import com.kool.kmqtt.server.packet.Packet;
import com.kool.kmqtt.server.packet.PublishVariableHeader;
import com.kool.kmqtt.server.session.SessionContext;
import com.kool.kmqtt.service.KafkaProvider;
import com.kool.kmqtt.util.DateUtil;
import com.kool.kmqtt.util.SpringUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * 报文发送类
 */
@Slf4j
public class PacketSender {
    /**
     * 报文编码器
     */
    private final PacketEncoder packetEncoder;
    /**
     * 会话上下文
     */
    private final SessionContext sessionContext;

    private static final LinkedBlockingQueue<SendContext> sendQueue = new LinkedBlockingQueue<>();

    static {
        new Thread(() -> {
            try {
                while (true) {
                    SendLog sendLog = new SendLog();
                    sendLog.setTimestamp(DateUtil.dateString(new Date()));
                    try {
                        //出队列，队列为空时阻塞
                        SendContext sendContext = sendQueue.take();
                        Packet packet = sendContext.packet;
                        int packetType = packet.getFixedHeader().getPacketType();
                        String clientId = sendContext.packetSender.sessionContext.getClientId();
                        String remoteAddress = sendContext.packetSender.sessionContext.getRemoteAddress();
                        String userName = sendContext.packetSender.sessionContext.getUserName();
                        sendLog.setClientId(clientId)
                                .setRemoteAddress(remoteAddress)
                                .setUserName(userName)
                                .setPacketType(Integer.toString(packetType));
                        if (packetType == PacketTypeEnum.PUBLISH.getCode()) {
                            //取PUBLISH报文的主题
                            PublishVariableHeader publishVariableHeader = (PublishVariableHeader) packet.getVariableHeader();
                            sendLog.setTopicName(publishVariableHeader.getTopicName());
                        }
                        log.debug("往客户端[{}]发送[{}]报文,ID=[{}]：{}",
                                clientId,
                                PacketTypeEnum.getDesc(packet.getFixedHeader().getPacketType()),
                                packet.getPacketId(),
                                JSON.toJSONString(packet));
                        ChannelHandlerContext ctx = sendContext.packetSender.sessionContext.getCtx().get();
                        byte[] bytes = sendContext.packetSender.packetEncoder.encode(packet);
                        final ByteBuf byteBuf = ctx.alloc().buffer(bytes.length);
                        byteBuf.writeBytes(bytes);
                        //发送报文
                        ctx.channel().writeAndFlush(byteBuf);
                        sendLog.setIsSuccess("1");
                    } catch (Exception e) {
                        log.error(e.getMessage(), e);
                        sendLog.setIsSuccess("0");
                    } finally {
                        if (ServerConfig.getInstance().getLogAnalysisSwitch()) {
                            //推送日志到kafka，日志统计分析
                            try {
                                KafkaProvider kafkaProvider = SpringUtil.getBean(KafkaProvider.class);
                                kafkaProvider.sendToKafka(TopicConstant.TOPIC_SUFFIX_SEND_LOG, JSON.toJSONString(sendLog));
                            } catch (Exception e) {
                                log.error(e.getMessage(), e);
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                log.error(t.getMessage(), t);
            }
        }).start();
    }

    /**
     * @param sessionContext 接收方的会话上下文
     * @param packetEncoder  报文编码器实例
     */
    public PacketSender(SessionContext sessionContext, PacketEncoder packetEncoder) {
        this.sessionContext = sessionContext;
        this.packetEncoder = packetEncoder;
    }

    /**
     * 发送报文
     *
     * @param packet
     */
    public void send(Packet packet) {
        SendContext sendContext = new SendContext();
        sendContext.packet = packet;
        sendContext.packetSender = this;
        sendQueue.offer(sendContext);
    }

    class SendContext {
        private PacketSender packetSender;
        private Packet packet;
    }
}
